package cn.rdtimes.wolfdsp.core.job;

import cn.rdtimes.wolfdsp.core.data.TaskInstance;
import cn.rdtimes.wolfdsp.core.job.params.DefaultParameters;
import cn.rdtimes.wolfdsp.core.job.params.Parameters;
import cn.rdtimes.wolfdsp.core.util.StringUtil;
import com.alibaba.fastjson.JSON;

import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;

/**
 * 调度中执行的任务实例
 *
 * @author BZ
 */
class PerformTaskInstance {
    private final TaskInstance taskInstance;
    private final Parameters<String, Object> inputParams = new DefaultParameters(Parameters.ParameterKind.TASK);
    private final Parameters<String, Object> outputParams = new DefaultParameters(Parameters.ParameterKind.TASK);

    // 前置任务的数量
    private final int preTaskCount;
    // 当前值和preTaskCount相等时,此任务实例才可以执行,说明前置任务完成
    private int completePreTaskCount;

    // 直接任务(孩子)的下标数组
    private int[] childTaskIndexes = new int[0];
    // PreformJobGraphInstance中的下标值
    private volatile int taskIndex = -1;
    // 这个值可以为空,当单独重新运行任务实例时将为空值
    private final PerformJobGraphInstance pJobInstance;

    // 查询时的轮询时间戳
    private volatile long timestamp = System.currentTimeMillis();
    // 调用节点的ip和端口
    private volatile String nodeIp;
    private volatile int nodePort;
    // 任务执行的句柄
    private volatile Future<?> future;

    PerformTaskInstance(TaskInstance instance) {
        this(null, instance);
    }

    PerformTaskInstance(PerformJobGraphInstance pJobInstance, TaskInstance instance) {
        this.taskInstance = instance;
        this.pJobInstance = pJobInstance;
        String json = instance.getInputParamJson();
        if (!StringUtil.isEmpty(json)) {
            inputParams.parseJson(json);
        }
        json = instance.getOutputParamJson();
        if (!StringUtil.isEmpty(json)) {
            outputParams.parseJson(json);
        }
        if (StringUtil.isEmpty(instance.getPreTaskIds())) {
            preTaskCount = 0;
        } else {
            preTaskCount = (instance.getPreTaskIds().split(",").length);
        }
    }

    final synchronized void addChildTask(PerformTaskInstance instance) {
        childTaskIndexes = Arrays.copyOf(childTaskIndexes, childTaskIndexes.length + 1);
        childTaskIndexes[childTaskIndexes.length - 1] = instance.getTaskIndex();
    }

    /**
     * 增加本实例下的子任务前置任务数量,并判断子任务的前置数量达到需要的数量可以启动子任务
     * 通常由前置任务调用
     *
     * @return true-可以启动子任务
     */
    final synchronized boolean incPreTaskCountIfRunChild() {
        if (completePreTaskCount >= preTaskCount) return true;
        return ++completePreTaskCount >= preTaskCount;
    }

    final int getChildTaskCount() {
        return childTaskIndexes.length;
    }

    final synchronized PerformTaskInstance[] getChildTask() {
        if (pJobInstance == null || childTaskIndexes.length == 0) {
            return new PerformTaskInstance[0];
        }

        PerformTaskInstance[] ret = new PerformTaskInstance[childTaskIndexes.length];
        for (int i = 0; i < childTaskIndexes.length; i++) {
            ret[i] = pJobInstance.getAllTasks()[childTaskIndexes[i]];
        }
        return ret;
    }

    final String getTaskId() {
        return taskInstance.getTaskId();
    }

    final String getInstanceId() {
        return taskInstance.getInstanceId();
    }

    final TaskInstance getTaskInstance() {
        return taskInstance;
    }

    Parameters<String, Object> getInputParams() {
        return inputParams;
    }

    Parameters<String, Object> getOutputParams() {
        return outputParams;
    }

    PerformJobGraphInstance getPerformJobGraphInstance() {
        return pJobInstance;
    }

    // 这里整合输入参数
    final void mergeInputParams(Map<String, Object> params) {
        if (params == null || params.size() == 0) return;

        Set<Map.Entry<String, Object>> entries = params.entrySet();
        for (Map.Entry<String, Object> entry : entries) {
            inputParams.put(entry.getKey(), entry.getValue());
        }
    }

    final void mergeOutputParams(String jsonParams) {
        if (StringUtil.isEmpty(jsonParams)) return;
        mergeOutputParams(JSON.parseObject(jsonParams));
    }

    final void mergeOutputParams(Map<String, Object> params) {
        if (params == null || params.size() == 0) return;

        Set<Map.Entry<String, Object>> entries = params.entrySet();
        for (Map.Entry<String, Object> entry : entries) {
            outputParams.put(entry.getKey(), entry.getValue());
        }
    }

    final boolean isIdleTimeout(int interval) {
        boolean b = interval <= (System.currentTimeMillis() - timestamp);
        if (b) timestamp = System.currentTimeMillis();
        return b;
    }

    int getTaskIndex() {
        return taskIndex;
    }

    void setTaskIndex(int taskIndex) {
        this.taskIndex = taskIndex;
    }

    long getTimestamp() {
        return timestamp;
    }

    void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    String getNodeIp() {
        return nodeIp;
    }

    void setNodeIp(String nodeIp) {
        this.nodeIp = nodeIp;
    }

    int getNodePort() {
        return nodePort;
    }

    void setNodePort(int nodePort) {
        this.nodePort = nodePort;
    }

    public Future<?> getFuture() {
        return future;
    }

    public void setFuture(Future<?> future) {
        this.future = future;
    }

}
